package com.oracle.violet.first.etl

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object SwitchParqust {

  def main(args: Array[String]): Unit = {

    if (args.length < 2) {
      println(
        """
          |com.oracle.violet.first
          |dataInputPath
          |dataOutputPath
        """.stripMargin
      )
      System.exit(0)
    }
    //接收数据
    val Array(dataInputPath, dataOutputPath) = args

    val conf = new SparkConf()
    //设置序列化格式
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.setMaster("local[*]")
    conf.setAppName("日志转化")


    val sc = new SparkContext(conf)


    //读取文件
    val peopleDf: RDD[String] = sc.textFile(dataInputPath)
    import com.oracle.violet.first.bean.TypePlus._
    val Line2Log: RDD[Row] = peopleDf.map(_.split(",", -1)).filter(_.length >= 85).map(fields => {
      Row(
        fields(0),
        fields(1).toIntPlus,
        fields(3).toIntPlus,
        fields(2).toIntPlus,
        fields(4).toIntPlus,
        fields(5),
        fields(6),
        fields(7).toIntPlus,
        fields(8).toIntPlus,
        fields(9).toDoublePlus,
        fields(10).toDoublePlus,
        fields(11),
        fields(12),
        fields(13),
        fields(14),
        fields(15),
        fields(16),
        fields(17).toIntPlus,
        fields(18),
        fields(19),
        fields(20).toIntPlus,
        fields(21).toIntPlus,
        fields(22),
        fields(23),
        fields(24),
        fields(25),
        fields(26).toIntPlus,
        fields(27),
        fields(28).toIntPlus,
        fields(29),
        fields(30).toIntPlus,
        fields(31).toIntPlus,
        fields(32).toIntPlus,
        fields(33),
        fields(34).toIntPlus,
        fields(35).toIntPlus,
        fields(36).toIntPlus,
        fields(37),
        fields(38).toIntPlus,
        fields(39).toIntPlus,
        fields(40).toDoublePlus,
        fields(41).toDoublePlus,
        fields(42).toIntPlus,
        fields(43),
        fields(44).toDoublePlus,
        fields(45).toDoublePlus,
        fields(46),
        fields(47),
        fields(48),
        fields(49),
        fields(50),
        fields(51),
        fields(52),
        fields(53),
        fields(54),
        fields(55),
        fields(56),
        fields(57).toIntPlus,
        fields(58).toDoublePlus,
        fields(59).toIntPlus,
        fields(60).toIntPlus,
        fields(61),
        fields(62),
        fields(63),
        fields(64),
        fields(65),
        fields(66),
        fields(67),
        fields(68),
        fields(69),
        fields(70),
        fields(71),
        fields(72),
        fields(73).toIntPlus,
        fields(74).toDoublePlus,
        fields(75).toDoublePlus,
        fields(76).toDoublePlus,
        fields(77).toDoublePlus,
        fields(78).toDoublePlus,
        fields(79),
        fields(80),
        fields(81),
        fields(82),
        fields(83),
        fields(84).toIntPlus

      )
    })

    //构建schema信息，定义每个字段的类型（表的结构）
    val schema = StructType(
      Seq(
        StructField("sessionid", StringType),
        StructField("advertisersid", IntegerType),
        StructField("adorderid", IntegerType),
        StructField("adcreativeid", IntegerType),
        StructField("adplatformproviderid", IntegerType),
        StructField("sdkversion", StringType),
        StructField("adplatformkey", StringType),
        StructField("putinmodeltype", IntegerType),
        StructField("requestmode", IntegerType),
        StructField("adprice", DoubleType),
        StructField("adppprice", DoubleType),
        StructField("requestdate", StringType),
        StructField("ip", StringType),
        StructField("appid", StringType),
        StructField("appname", StringType),
        StructField("uuid", StringType),
        StructField("device", StringType),
        StructField("client", IntegerType),
        StructField("osversion", StringType),
        StructField("density", StringType),
        StructField("pw", IntegerType),
        StructField("ph", IntegerType),
        StructField("long", StringType),
        StructField("lat", StringType),
        StructField("provincename", StringType),
        StructField("cityname", StringType),
        StructField("ispid", IntegerType),
        StructField("ispname", StringType),
        StructField("networkmannerid", IntegerType),
        StructField("networkmannername", StringType),
        StructField("iseffective", IntegerType),
        StructField("isbilling", IntegerType),
        StructField("adspacetype", IntegerType),
        StructField("adspacetypename", StringType),
        StructField("devicetype", IntegerType),
        StructField("processnode", IntegerType),
        StructField("apptype", IntegerType),
        StructField("district", StringType),
        StructField("paymode", IntegerType),
        StructField("isbid", IntegerType),
        StructField("bidprice", DoubleType),
        StructField("winprice", DoubleType),
        StructField("iswin", IntegerType),
        StructField("cur", StringType),
        StructField("rate", DoubleType),
        StructField("cnywinprice", DoubleType),
        StructField("imei", StringType),
        StructField("mac", StringType),
        StructField("idfa", StringType),
        StructField("openudid", StringType),
        StructField("androidid", StringType),
        StructField("rtbprovince", StringType),
        StructField("rtbcity", StringType),
        StructField("rtbdistrict", StringType),
        StructField("rtbstreet", StringType),
        StructField("storeurl", StringType),
        StructField("realip", StringType),
        StructField("isqualityapp", IntegerType),
        StructField("bidfloor", DoubleType),
        StructField("aw", IntegerType),
        StructField("ah", IntegerType),
        StructField("imeimd5", StringType),
        StructField("macmd5", StringType),
        StructField("idfamd5", StringType),
        StructField("openudidmd5", StringType),
        StructField("androididmd5", StringType),
        StructField("imeisha1", StringType),
        StructField("macsha1", StringType),
        StructField("idfasha1", StringType),
        StructField("openudidsha1", StringType),
        StructField("androididsha1", StringType),
        StructField("uuidunknow", StringType),
        StructField("userid", StringType),
        StructField("iptype", IntegerType),
        StructField("initbidprice", DoubleType),
        StructField("adpayment", DoubleType),
        StructField("agentrate", DoubleType),
        StructField("lrate", DoubleType),
        StructField("adxrate", DoubleType),
        StructField("title", StringType),
        StructField("keywords", StringType),
        StructField("tagid", StringType),
        StructField("callbackdate", StringType),
        StructField("channelid", StringType),
        StructField("mediatype", IntegerType)

      )
    )

    //处理数据
    val sqlContext = new SQLContext(sc)

    //设置文件的压缩格式
    sqlContext.setConf("spark.io.compression.codec", "snappy")
    val frame = sqlContext.createDataFrame(Line2Log, schema)
    val fs = FileSystem.get(sc.hadoopConfiguration)
    val path = new Path(dataOutputPath)
    if (fs.exists(path)) {
      fs.delete(path, true)
    }

    //存数据
    //frame.write.parquet(dataOutputPath)
    frame.write./*partitionBy("provincename","cityname").*/parquet("H:\\甲骨文培训\\大数据\\Linux\\大纲\\项目二\\VIOLET\\prquet")

    sc.stop()
  }

}


